package com.bawei.flink.tableapi;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

public class CDCDemo {
    public static void main(String[] args) {
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

        TableEnvironment tEnv = TableEnvironment.create(settings);

        tEnv.executeSql("CREATE TABLE products(\n" +
                "id INT NOT NULL,\n" +
                "name STRING,\n" +
                "description STRING,\n" +
                "weight DECIMAL(10,3),\n" +
                "PRIMARY KEY(id) NOT ENFORCED\n" +
                ") WITH (\n" +
                " 'connector' = 'mysql-cdc',\n" +
                "'hostname' = 'cdh1',\n" +
                "'port' = '3306',\n" +
                "'username' = 'root',\n" +
                "'password' = '123456',\n" +
                "'database-name' = 'mydatabase',\n" +
                "'table-name' = 'products')");

        tEnv.sqlQuery("select * from products").execute().print();

    }
}
